{#
  Template for 'response_codec_response_test.cc'.

  Provides integration tests using Kafka codec.
  The tests do the following:
  - create the message,
  - serialize the message into buffer,
  - pass the buffer to the codec,
  - capture messages received in callback,
  - verify that captured messages are identical to the ones sent.
#}
#include "contrib/kafka/filters/network/source/external/responses.h"
#include "contrib/kafka/filters/network/source/response_codec.h"

#include "contrib/kafka/filters/network/test/buffer_based_test.h"
#include "contrib/kafka/filters/network/test/serialization_utilities.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace ResponseCodecResponseTest {

class ResponseCodecResponseTest : public testing::Test, public MessageBasedTest<ResponseEncoder> {};

using ResponseCapturingCallback = CapturingCallback<ResponseCallback, AbstractResponseSharedPtr,
  ResponseMetadataSharedPtr>;

{% for message_type in message_types %}

// Integration test for {{ message_type.name }} messages.

TEST_F(ResponseCodecResponseTest, ShouldHandle{{ message_type.name }}Messages) {
  // given
  const auto callback = std::make_shared<ResponseCapturingCallback>();
  ResponseDecoder testee{ {callback} };

  using ResponseUnderTest = Response<{{ message_type.name }}>;

  std::vector<ResponseUnderTest> sent;
  int32_t correlation_id = 0;

  {% for field_list in message_type.compute_field_lists() %}
  for (int i = 0; i < 100; ++i ) {
    {# Response header cannot contain tagged fields if response does not support them. #}
    const TaggedFields tagged_fields = responseUsesTaggedFieldsInHeader(
      {{ message_type.get_extra('api_key') }}, {{ field_list.version }}) ?
        TaggedFields{ { TaggedField{ 10, Bytes{1, 2, 3, 4} } } }:
        TaggedFields({});
    const ResponseMetadata metadata = {
      {{ message_type.get_extra('api_key') }},
      {{ field_list.version }},
      ++correlation_id,
      tagged_fields,
    };
    const {{ message_type.name }} data = { {{ field_list.example_value() }} };
    const ResponseUnderTest response = {metadata, data};
    putMessageIntoBuffer(response);
    testee.expectResponse(
      correlation_id, {{ message_type.get_extra('api_key') }}, {{ field_list.version }});
    sent.push_back(response);
  }
  {% endfor %}

  // when
  testee.onData(buffer_);

  // then
  const std::vector<AbstractResponseSharedPtr>& received = callback->getCapturedMessages();
  ASSERT_EQ(received.size(), sent.size());
  ASSERT_EQ(received.size(), correlation_id);

  for (size_t i = 0; i < received.size(); ++i) {
    const std::shared_ptr<ResponseUnderTest> response =
      std::dynamic_pointer_cast<ResponseUnderTest>(received[i]);
    ASSERT_NE(response, nullptr);
    ASSERT_EQ(*response, sent[i]);
  }
}
{% endfor %}

} // namespace ResponseCodecResponseTest
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
